package cn.edu.nju

import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by thpffcj on 2019/10/31.
 */
object GraphExample2 {

  def main(args: Array[String]): Unit = {

    // 设置运行环境
    val conf = new SparkConf().setAppName("GraphExample2").setMaster("local")
    val sc = new SparkContext(conf)

    // 设置顶点和边，注意顶点和边都是用元组定义的Array
    // 顶点的数据类型是VD:(String, Int)
    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50))
    )

    // 边的数据类型ED:Int
    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(5L, 2L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )

    // 构造vertexRDD和edgeRDD
    val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
    val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)

    // 构造图Graph[VD,ED]
    val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

    // 找出图中年龄大于30的顶点
    graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
      case (id, (name, age)) => println(s"$name is $age")
    }

    // 边操作：找出图中属性大于5的边
    graph.edges.filter(e => e.attr > 5)
      .collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

    // triplets操作，((srcId, srcAttr), (dstId, dstAttr), attr)
    // 列出边属性 >5 的tripltes
    for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) {
      println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
    }

    // Degrees操作
    // 找出图中最大的出度、入度、度数
    def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
      if (a._2 > b._2) a else b
    }

    println("max of outDegrees:" + graph.outDegrees.reduce(max) + " max of inDegrees:" + graph.inDegrees.reduce(max) + " max of Degrees:" + graph.degrees.reduce(max))

    // 转换操作
    // 顶点的转换操作，顶点age + 10
    graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)) }
      .vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))

    // 边的转换操作，边的属性*2
    graph.mapEdges(e => e.attr * 2)
      .edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

    // 顶点年纪>30的子图
    val subGraph = graph.subgraph(vpred = (id, vd) => vd._2 >= 30)
    subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))

    // 子图所有边
    subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

    // 连接操作
    val inDegrees: VertexRDD[Int] = graph.inDegrees
    case class User(name: String, age: Int, inDeg: Int, outDeg: Int)

    // 创建一个新图，顶点VD的数据类型为User，并从graph做类型转换
    val initialUserGraph: Graph[User, Int] = graph.mapVertices {
      case (id, (name, age)) => User(name, age, 0, 0)}

    // initialUserGraph与inDegrees、outDegrees（RDD）进行连接，并修改initialUserGraph中inDeg值、outDeg值
    val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
      case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)
    }.outerJoinVertices(initialUserGraph.outDegrees) {
      case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg,outDegOpt.getOrElse(0))
    }

    // 连接图的属性
    userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg}  outDeg: ${v._2.outDeg}"))

    // 出度和入读相同的人员
    userGraph.vertices.filter {
      case (id, u) => u.inDeg == u.outDeg
    }.collect.foreach {
      case (id, property) => println(property.name)
    }

    // 聚合操作
    // 找出年纪最大的追求者
//    val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](
//
//      // 将源顶点的属性发送给目标顶点，map过程
//      edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
//
//      // 得到最大追求者，reduce过程
//      (a, b) => if (a._2 > b._2) a else b
//    )

//    userGraph.vertices.leftJoin(oldestFollower) { (id, user, optOldestFollower) =>
//      optOldestFollower match {
//        case None => s"${user.name} does not have any followers."
//        case Some((name, age)) => s"${name} is the oldest follower of ${user.name}."
//      }
//    }.collect.foreach { case (id, str) => println(str)}


  }


}
